package cn.zhaopin.starter.mq.core;

import cn.zhaopin.starter.mq.common.PulsarMessage;
import cn.zhaopin.starter.mq.properties.PulsarProperties;
import cn.zhaopin.starter.mq.support.PulsarMessageUtils;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.env.Environment;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.core.AbstractMessageSendingTemplate;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.MimeTypeUtils;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * Description:tdmq模板
 *
 * @author: zuomin (myleszelic@outlook.com)
 * @date: 2021/07/16-11:07
 */
@SuppressWarnings({"unchecked", "NullableProblems", "rawtypes"})
public class PulsarTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {

    private static final Logger log = LoggerFactory.getLogger(PulsarTemplate.class);

    private PulsarClient client;
    private PulsarProperties pulsarProperties;
    private Environment environment;
    private MessageConverter messageConverter;

    public PulsarTemplate(PulsarClient client, PulsarProperties pulsarProperties) {
        this.client = client;
        this.pulsarProperties = pulsarProperties;
    }

    private PulsarProducerFactory factory;

    /** 生产者map*/
    private final Map<String, Producer<PulsarMessage>> producerFactories = new ConcurrentHashMap<>(1 << 4);

    @Override
    protected void doSend(String destination, Message<?> message) {
        MessageId messageId = sendSync(destination, message);
        if (log.isDebugEnabled()) {
            log.debug("send message to `{}` finished. result:{}", destination, messageId.toString());
        }
    }

    /**
     * <p>Send message in synchronous mode</p>
     *
     * @param topic topic
     * @param message message body
     * @return org.apache.pulsar.client.api.MessageId
     *
     */
    public  MessageId sendSync(String topic, Message message) {
        try {

            Producer<PulsarMessage> producer = getAvailableProducer(topic);

            return producer.send(this.createPulsarMessage(topic, message));
        } catch (PulsarClientException e) {
            log.error("send sync message to `{}` error. exception message:{}", topic, e.getMessage());
            e.printStackTrace();
        }
        return null;
    }

    private PulsarMessage createPulsarMessage(String topic, Message message) {
        Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), null);
        // Do not use getMessageConvert(), because don't want to use getMessageConverter.doConvert()
        return PulsarMessageUtils.convertToPulsarMessage(this.messageConverter, msg);
    }

    /**
     * <p>Send message to broker asynchronously</p>
     *
     * @param topic topic
     * @param message message body
     * @return org.apache.pulsar.client.api.MessageId
     */
    public CompletableFuture<MessageId> sendAsync(String topic, Message message) {
        Producer<PulsarMessage> producer = getAvailableProducer(topic);
        return producer.sendAsync(this.createPulsarMessage(topic, message));
    }

    /**
     * <p>Send timed message sync</p>
     *
     * @param topic topic
     * @param message message
     * @param deliverAt deliver at
     * @return org.apache.pulsar.client.api.MessageId
     */
    public MessageId sendTimed(String topic, Message message, LocalDateTime deliverAt) {
        try {
            Assert.notNull(deliverAt, "deliverAt can't be null, please set property");
            Producer<PulsarMessage> producer = getAvailableProducer(topic);
            return producer.newMessage().value(this.createPulsarMessage(topic, message))
                    .deliverAt(deliverAt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli())
                    .send();
        } catch (PulsarClientException e) {
            log.error("send timed message error, topic:{}, message: {}", topic, e.getMessage());
            throw new MessagingException(e.getMessage(), e);
        }
    }

    /**
     * <p>Send timed message async</p>
     * 定时消息的时间范围为当前时间开始计算，864000秒（10天）以内的任意时刻。如10月1日12:00开始，最长可以设置到10月11日12:00。
     *
     * @param topic topic
     * @param message message
     * @param deliverAt deliver at
     * @return org.apache.pulsar.client.api.MessageId
     */
    public CompletableFuture<MessageId> sendTimedAsync(String topic, Message message, LocalDateTime deliverAt) {
        Assert.notNull(deliverAt, "deliverAt can't be null, please set property");
        Producer producer = getAvailableProducer(topic);
        return producer.newMessage().value(this.createPulsarMessage(topic, message))
                .deliverAt(deliverAt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli())
                .sendAsync()
                .exceptionally(e -> {
                    log.error("send timed message error, topic:{}", topic);
                    return null;
                });
    }

    /**
     * <p>Send delay message sync</p>
     * 延时消息的时长取值范围为0 - 864000秒（0秒 - 10天）。如10月1日12:00开始，最长可以设置864000秒。如果设置的时间超过这个时间，则直接按864000秒计算，到时会直接投递。
     *
     * @param topic topic
     * @param message message
     * @param deliverAfter deliver after
     * @return org.apache.pulsar.client.api.MessageId
     */
    public MessageId sendDelay(String topic, Message message, Long deliverAfter, TimeUnit timeUnit) {
        try {
            Assert.notNull(deliverAfter, "deliverAfter can't be null, please set property");
            Producer<PulsarMessage> producer = getAvailableProducer(topic);
            return producer.newMessage().value(this.createPulsarMessage(topic, message))
                    .deliverAfter(deliverAfter, timeUnit)
                    .send();
        } catch (PulsarClientException e) {
            log.error("send delay message error, topic:{}, message: {}", topic, e.getMessage());
            throw new MessagingException(e.getMessage(), e);
        }
    }

    /**
     * <p>Send delay message async</p>
     * 延时消息的时长取值范围为0 - 864000秒（0秒 - 10天）。如10月1日12:00开始，最长可以设置864000秒。如果设置的时间超过这个时间，则直接按864000秒计算，到时会直接投递。
     *
     * @param topic topic
     * @param message message
     * @param deliverAfter deliver after
     * @return org.apache.pulsar.client.api.MessageId
     */
    public CompletableFuture<MessageId> sendDelayAsync(String topic, Message message, Long deliverAfter, TimeUnit timeUnit) {
        Assert.notNull(deliverAfter, "deliverAfter can't be null, please set property");
        Producer producer = getAvailableProducer(topic);
        return producer.newMessage().value(this.createPulsarMessage(topic, message))
                .deliverAfter(deliverAfter, timeUnit)
                .sendAsync()
                .exceptionally(e -> {
                    log.error("send timed message error, topic:{}", topic);
                    return null;
                });
    }

    private Producer<PulsarMessage> getAvailableProducer(String topic) {
        String producerName = factory.getProducerName(topic);
        Producer<PulsarMessage> producer = producerFactories.get(producerName);
        if(Objects.isNull(producer)) {
            producer = factory.create(topic);
            producerFactories.put(producerName, producer);
        }
        return producer;
    }

    @Override
    public void destroy() throws Exception {
        if (!CollectionUtils.isEmpty(producerFactories)) {
            for (Map.Entry<String, Producer<PulsarMessage>> producerEntry : producerFactories.entrySet()) {
                Producer<PulsarMessage> producer = producerEntry.getValue();
                if (Objects.nonNull(producer) && !producer.isConnected()) {
                    producer.close();
                }
            }
            logger.info("Pulsar producer destroy finish");
        }
    }

    @Override
    protected Message<?> doConvert(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor) {
        Message<?> message = super.doConvert(payload, headers, postProcessor);
        MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
        builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
        return builder.build();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        factory = new PulsarProducerFactory(client, environment, pulsarProperties);
    }

    public PulsarClient getClient() {
        return client;
    }

    public PulsarProperties getTdmqProperties() {
        return pulsarProperties;
    }

    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }

    public void initMessageConvert(MessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }
}
